package com.wugui.datax.rpc.remoting.net.impl.netty.client;

import com.wugui.datax.rpc.remoting.invoker.XxlRpcInvokerFactory;
import com.wugui.datax.rpc.remoting.net.common.ConnectClient;
import com.wugui.datax.rpc.remoting.net.impl.netty.codec.NettyDecoder;
import com.wugui.datax.rpc.remoting.net.impl.netty.codec.NettyEncoder;
import com.wugui.datax.rpc.remoting.net.params.Beat;
import com.wugui.datax.rpc.remoting.net.params.XxlRpcRequest;
import com.wugui.datax.rpc.remoting.net.params.XxlRpcResponse;
import com.wugui.datax.rpc.serialize.Serializer;
import com.wugui.datax.rpc.util.IpUtil;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;

/**
 * netty pooled client
 *
 * @author xuxueli
 */
@Slf4j
public class NettyConnectClient extends ConnectClient {

  private EventLoopGroup group;
  private Channel channel;

  @Override
  public void init(
      String address, final Serializer serializer, final XxlRpcInvokerFactory xxlRpcInvokerFactory)
      throws Exception {
    final NettyConnectClient thisClient = this;

    Object[] array = IpUtil.parseIpPort(address);
    String host = (String) array[0];
    int port = (int) array[1];

    this.group = new NioEventLoopGroup();
    Bootstrap bootstrap = new Bootstrap();
    bootstrap
        .group(group)
        .channel(NioSocketChannel.class)
        .handler(
            new ChannelInitializer<SocketChannel>() {
              @Override
              public void initChannel(SocketChannel channel) throws Exception {
                channel
                    .pipeline()
                    .addLast(
                        new IdleStateHandler(
                            0, 0, Beat.BEAT_INTERVAL, TimeUnit.SECONDS)) // beat N, close if fail
                    .addLast(new NettyEncoder(XxlRpcRequest.class, serializer))
                    .addLast(new NettyDecoder(XxlRpcResponse.class, serializer))
                    .addLast(new NettyClientHandler(xxlRpcInvokerFactory, thisClient));
              }
            })
        .option(ChannelOption.TCP_NODELAY, true)
        .option(ChannelOption.SO_KEEPALIVE, true)
        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000);
    this.channel = bootstrap.connect(host, port).sync().channel();

    // valid
    if (!isValidate()) {
      close();
      return;
    }

    log.debug(
        ">>>>>>>>>>> xxl-rpc netty client proxy, connect to server success at host:{}, port:{}",
        host,
        port);
  }

  @Override
  public boolean isValidate() {
    if (this.channel != null) {
      return this.channel.isActive();
    }
    return false;
  }

  @Override
  public void close() {
    if (this.channel != null && this.channel.isActive()) {
      this.channel.close(); // if this.channel.isOpen()
    }
    if (this.group != null && !this.group.isShutdown()) {
      this.group.shutdownGracefully();
    }
    log.debug(">>>>>>>>>>> xxl-rpc netty client close.");
  }

  @Override
  public void send(XxlRpcRequest xxlRpcRequest) throws Exception {
    this.channel.writeAndFlush(xxlRpcRequest).sync();
  }
}
